home *** CD-ROM | disk | FTP | other *** search
/ PC World Komputer 2010 April / PCWorld0410.iso / hity wydania / Ubuntu 9.10 PL / karmelkowy-koliberek-9.10-netbook-remix-PL.iso / casper / filesystem.squashfs / usr / lib / python2.6 / multiprocessing / pool.pyc (.txt) < prev    next >
Python Compiled Bytecode  |  2009-11-11  |  19KB  |  636 lines

  1. # Source Generated with Decompyle++
  2. # File: in.pyc (Python 2.6)
  3.  
  4. __all__ = [
  5.     'Pool']
  6. import threading
  7. import Queue
  8. import itertools
  9. import collections
  10. import time
  11. from multiprocessing import Process, cpu_count, TimeoutError
  12. from multiprocessing.util import Finalize, debug
  13. RUN = 0
  14. CLOSE = 1
  15. TERMINATE = 2
  16. job_counter = itertools.count()
  17.  
  18. def mapstar(args):
  19.     return map(*args)
  20.  
  21.  
  22. def worker(inqueue, outqueue, initializer = None, initargs = ()):
  23.     put = outqueue.put
  24.     get = inqueue.get
  25.     if hasattr(inqueue, '_writer'):
  26.         inqueue._writer.close()
  27.         outqueue._reader.close()
  28.     
  29.     if initializer is not None:
  30.         initializer(*initargs)
  31.     
  32.     while None:
  33.         
  34.         try:
  35.             task = get()
  36.         except (EOFError, IOError):
  37.             debug('worker got EOFError or IOError -- exiting')
  38.             break
  39.  
  40.         if task is None:
  41.             debug('worker got sentinel -- exiting')
  42.             break
  43.         
  44.         (job, i, func, args, kwds) = task
  45.         
  46.         try:
  47.             result = (True, func(*args, **kwds))
  48.         except Exception:
  49.             e = None
  50.             result = (False, e)
  51.  
  52.         continue
  53.         return None
  54.  
  55.  
  56. class Pool(object):
  57.     '''
  58.     Class which supports an async version of the `apply()` builtin
  59.     '''
  60.     Process = Process
  61.     
  62.     def __init__(self, processes = None, initializer = None, initargs = ()):
  63.         self._setup_queues()
  64.         self._taskqueue = Queue.Queue()
  65.         self._cache = { }
  66.         self._state = RUN
  67.         if processes is None:
  68.             
  69.             try:
  70.                 processes = cpu_count()
  71.             except NotImplementedError:
  72.                 processes = 1
  73.             except:
  74.                 None<EXCEPTION MATCH>NotImplementedError
  75.             
  76.  
  77.         None<EXCEPTION MATCH>NotImplementedError
  78.         self._pool = []
  79.         for i in range(processes):
  80.             w = self.Process(target = worker, args = (self._inqueue, self._outqueue, initializer, initargs))
  81.             self._pool.append(w)
  82.             w.name = w.name.replace('Process', 'PoolWorker')
  83.             w.daemon = True
  84.             w.start()
  85.         
  86.         self._task_handler = threading.Thread(target = Pool._handle_tasks, args = (self._taskqueue, self._quick_put, self._outqueue, self._pool))
  87.         self._task_handler.daemon = True
  88.         self._task_handler._state = RUN
  89.         self._task_handler.start()
  90.         self._result_handler = threading.Thread(target = Pool._handle_results, args = (self._outqueue, self._quick_get, self._cache))
  91.         self._result_handler.daemon = True
  92.         self._result_handler._state = RUN
  93.         self._result_handler.start()
  94.         self._terminate = Finalize(self, self._terminate_pool, args = (self._taskqueue, self._inqueue, self._outqueue, self._pool, self._task_handler, self._result_handler, self._cache), exitpriority = 15)
  95.  
  96.     
  97.     def _setup_queues(self):
  98.         SimpleQueue = SimpleQueue
  99.         import queues
  100.         self._inqueue = SimpleQueue()
  101.         self._outqueue = SimpleQueue()
  102.         self._quick_put = self._inqueue._writer.send
  103.         self._quick_get = self._outqueue._reader.recv
  104.  
  105.     
  106.     def apply(self, func, args = (), kwds = { }):
  107.         '''
  108.         Equivalent of `apply()` builtin
  109.         '''
  110.         if not self._state == RUN:
  111.             raise AssertionError
  112.         return self.apply_async(func, args, kwds).get()
  113.  
  114.     
  115.     def map(self, func, iterable, chunksize = None):
  116.         '''
  117.         Equivalent of `map()` builtin
  118.         '''
  119.         if not self._state == RUN:
  120.             raise AssertionError
  121.         return self.map_async(func, iterable, chunksize).get()
  122.  
  123.     
  124.     def imap(self, func, iterable, chunksize = 1):
  125.         '''
  126.         Equivalent of `itertools.imap()` -- can be MUCH slower than `Pool.map()`
  127.         '''
  128.         if not self._state == RUN:
  129.             raise AssertionError
  130.         if chunksize == 1:
  131.             result = IMapIterator(self._cache)
  132.             (self._state == RUN, self._taskqueue.put)(((lambda .0: for i, x in .0:
  133. (result._job, i, func, (x,), { }))(enumerate(iterable)), result._set_length))
  134.             return result
  135.         if not chunksize > 1:
  136.             raise AssertionError
  137.         task_batches = Pool._get_tasks(func, iterable, chunksize)
  138.         result = IMapIterator(self._cache)
  139.         (self._taskqueue.put,)(((lambda .0: for i, x in .0:
  140. (result._job, i, mapstar, (x,), { }))(enumerate(task_batches)), result._set_length))
  141.         return (lambda .0: for chunk in .0:
  142. for item in chunk:
  143. item)(result)
  144.  
  145.     
  146.     def imap_unordered(self, func, iterable, chunksize = 1):
  147.         '''
  148.         Like `imap()` method but ordering of results is arbitrary
  149.         '''
  150.         if not self._state == RUN:
  151.             raise AssertionError
  152.         if chunksize == 1:
  153.             result = IMapUnorderedIterator(self._cache)
  154.             (self._state == RUN, self._taskqueue.put)(((lambda .0: for i, x in .0:
  155. (result._job, i, func, (x,), { }))(enumerate(iterable)), result._set_length))
  156.             return result
  157.         if not chunksize > 1:
  158.             raise AssertionError
  159.         task_batches = Pool._get_tasks(func, iterable, chunksize)
  160.         result = IMapUnorderedIterator(self._cache)
  161.         (self._taskqueue.put,)(((lambda .0: for i, x in .0:
  162. (result._job, i, mapstar, (x,), { }))(enumerate(task_batches)), result._set_length))
  163.         return (lambda .0: for chunk in .0:
  164. for item in chunk:
  165. item)(result)
  166.  
  167.     
  168.     def apply_async(self, func, args = (), kwds = { }, callback = None):
  169.         '''
  170.         Asynchronous equivalent of `apply()` builtin
  171.         '''
  172.         if not self._state == RUN:
  173.             raise AssertionError
  174.         result = ApplyResult(self._cache, callback)
  175.         self._taskqueue.put(([
  176.             (result._job, None, func, args, kwds)], None))
  177.         return result
  178.  
  179.     
  180.     def map_async(self, func, iterable, chunksize = None, callback = None):
  181.         '''
  182.         Asynchronous equivalent of `map()` builtin
  183.         '''
  184.         if not self._state == RUN:
  185.             raise AssertionError
  186.         if not hasattr(iterable, '__len__'):
  187.             iterable = list(iterable)
  188.         
  189.         if chunksize is None:
  190.             (chunksize, extra) = divmod(len(iterable), len(self._pool) * 4)
  191.             if extra:
  192.                 chunksize += 1
  193.             
  194.         
  195.         task_batches = Pool._get_tasks(func, iterable, chunksize)
  196.         result = MapResult(self._cache, chunksize, len(iterable), callback)
  197.         (self._taskqueue.put,)(((lambda .0: for i, x in .0:
  198. (result._job, i, mapstar, (x,), { }))(enumerate(task_batches)), None))
  199.         return result
  200.  
  201.     
  202.     def _handle_tasks(taskqueue, put, outqueue, pool):
  203.         thread = threading.current_thread()
  204.         for taskseq, set_length in iter(taskqueue.get, None):
  205.             i = -1
  206.             for i, task in enumerate(taskseq):
  207.                 if thread._state:
  208.                     debug('task handler found thread._state != RUN')
  209.                     break
  210.                 
  211.                 
  212.                 try:
  213.                     put(task)
  214.                 continue
  215.                 except IOError:
  216.                     debug('could not put task on queue')
  217.                     break
  218.                     continue
  219.                 
  220.  
  221.             elif set_length:
  222.                 debug('doing set_length()')
  223.                 set_length(i + 1)
  224.                 continue
  225.             None<EXCEPTION MATCH>IOError
  226.             continue
  227.             break
  228.         
  229.         
  230.         try:
  231.             debug('task handler sending sentinel to result handler')
  232.             outqueue.put(None)
  233.             debug('task handler sending sentinel to workers')
  234.             for p in pool:
  235.                 put(None)
  236.         except IOError:
  237.             debug('task handler got IOError when sending sentinels')
  238.  
  239.         debug('task handler exiting')
  240.  
  241.     _handle_tasks = staticmethod(_handle_tasks)
  242.     
  243.     def _handle_results(outqueue, get, cache):
  244.         thread = threading.current_thread()
  245.         while None:
  246.             
  247.             try:
  248.                 task = get()
  249.             except (IOError, EOFError):
  250.                 debug('result handler got EOFError/IOError -- exiting')
  251.                 return None
  252.  
  253.             if thread._state:
  254.                 if not thread._state == TERMINATE:
  255.                     raise AssertionError
  256.                 debug('result handler found thread._state=TERMINATE')
  257.                 break
  258.             
  259.             if task is None:
  260.                 debug('result handler got sentinel')
  261.                 break
  262.             
  263.             (job, i, obj) = task
  264.             
  265.             try:
  266.                 cache[job]._set(i, obj)
  267.             continue
  268.             except KeyError:
  269.                 continue
  270.             
  271.  
  272.             while cache and thread._state != TERMINATE:
  273.                 
  274.                 try:
  275.                     task = get()
  276.                 except (IOError, EOFError):
  277.                     None<EXCEPTION MATCH>KeyError
  278.                     None<EXCEPTION MATCH>KeyError
  279.                     debug('result handler got EOFError/IOError -- exiting')
  280.                     return None
  281.  
  282.                 if task is None:
  283.                     debug('result handler ignoring extra sentinel')
  284.                     continue
  285.                 
  286.                 (job, i, obj) = task
  287.                 
  288.                 try:
  289.                     cache[job]._set(i, obj)
  290.                 continue
  291.                 except KeyError:
  292.                     continue
  293.                 
  294.  
  295.                 None<EXCEPTION MATCH>KeyError
  296.             if hasattr(outqueue, '_reader'):
  297.                 debug('ensuring that outqueue is not full')
  298.                 
  299.                 try:
  300.                     for i in range(10):
  301.                         if not outqueue._reader.poll():
  302.                             break
  303.                         
  304.                         get()
  305.                 except (IOError, EOFError):
  306.                     pass
  307.                 except:
  308.                     None<EXCEPTION MATCH>(IOError, EOFError)
  309.                 
  310.  
  311.         None<EXCEPTION MATCH>(IOError, EOFError)
  312.         debug('result handler exiting: len(cache)=%s, thread._state=%s', len(cache), thread._state)
  313.  
  314.     _handle_results = staticmethod(_handle_results)
  315.     
  316.     def _get_tasks(func, it, size):
  317.         it = iter(it)
  318.         while None:
  319.             x = tuple(itertools.islice(it, size))
  320.             if not x:
  321.                 return None
  322.             yield (func, x)
  323.             continue
  324.             return None
  325.  
  326.     _get_tasks = staticmethod(_get_tasks)
  327.     
  328.     def __reduce__(self):
  329.         raise NotImplementedError('pool objects cannot be passed between processes or pickled')
  330.  
  331.     
  332.     def close(self):
  333.         debug('closing pool')
  334.         if self._state == RUN:
  335.             self._state = CLOSE
  336.             self._taskqueue.put(None)
  337.         
  338.  
  339.     
  340.     def terminate(self):
  341.         debug('terminating pool')
  342.         self._state = TERMINATE
  343.         self._terminate()
  344.  
  345.     
  346.     def join(self):
  347.         debug('joining pool')
  348.         if not self._state in (CLOSE, TERMINATE):
  349.             raise AssertionError
  350.         self._task_handler.join()
  351.         self._result_handler.join()
  352.         for p in self._pool:
  353.             p.join()
  354.         
  355.  
  356.     
  357.     def _help_stuff_finish(inqueue, task_handler, size):
  358.         debug('removing tasks from inqueue until task handler finished')
  359.         inqueue._rlock.acquire()
  360.         while task_handler.is_alive() and inqueue._reader.poll():
  361.             inqueue._reader.recv()
  362.             time.sleep(0)
  363.  
  364.     _help_stuff_finish = staticmethod(_help_stuff_finish)
  365.     
  366.     def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool, task_handler, result_handler, cache):
  367.         debug('finalizing pool')
  368.         task_handler._state = TERMINATE
  369.         taskqueue.put(None)
  370.         debug('helping task handler/workers to finish')
  371.         cls._help_stuff_finish(inqueue, task_handler, len(pool))
  372.         if not result_handler.is_alive() and len(cache) == 0:
  373.             raise AssertionError
  374.         result_handler._state = TERMINATE
  375.         outqueue.put(None)
  376.         if pool and hasattr(pool[0], 'terminate'):
  377.             debug('terminating workers')
  378.             for p in pool:
  379.                 p.terminate()
  380.             
  381.         
  382.         debug('joining task handler')
  383.         task_handler.join(1e+100)
  384.         debug('joining result handler')
  385.         result_handler.join(1e+100)
  386.         if pool and hasattr(pool[0], 'terminate'):
  387.             debug('joining pool workers')
  388.             for p in pool:
  389.                 p.join()
  390.             
  391.         
  392.  
  393.     _terminate_pool = classmethod(_terminate_pool)
  394.  
  395.  
  396. class ApplyResult(object):
  397.     
  398.     def __init__(self, cache, callback):
  399.         self._cond = threading.Condition(threading.Lock())
  400.         self._job = job_counter.next()
  401.         self._cache = cache
  402.         self._ready = False
  403.         self._callback = callback
  404.         cache[self._job] = self
  405.  
  406.     
  407.     def ready(self):
  408.         return self._ready
  409.  
  410.     
  411.     def successful(self):
  412.         if not self._ready:
  413.             raise AssertionError
  414.         return self._success
  415.  
  416.     
  417.     def wait(self, timeout = None):
  418.         self._cond.acquire()
  419.         
  420.         try:
  421.             if not self._ready:
  422.                 self._cond.wait(timeout)
  423.         finally:
  424.             self._cond.release()
  425.  
  426.  
  427.     
  428.     def get(self, timeout = None):
  429.         self.wait(timeout)
  430.         if not self._ready:
  431.             raise TimeoutError
  432.         self._ready
  433.         if self._success:
  434.             return self._value
  435.         raise self._value
  436.  
  437.     
  438.     def _set(self, i, obj):
  439.         (self._success, self._value) = obj
  440.         if self._callback and self._success:
  441.             self._callback(self._value)
  442.         
  443.         self._cond.acquire()
  444.         
  445.         try:
  446.             self._ready = True
  447.             self._cond.notify()
  448.         finally:
  449.             self._cond.release()
  450.  
  451.         del self._cache[self._job]
  452.  
  453.  
  454.  
  455. class MapResult(ApplyResult):
  456.     
  457.     def __init__(self, cache, chunksize, length, callback):
  458.         ApplyResult.__init__(self, cache, callback)
  459.         self._success = True
  460.         self._value = [
  461.             None] * length
  462.         self._chunksize = chunksize
  463.         if chunksize <= 0:
  464.             self._number_left = 0
  465.             self._ready = True
  466.         else:
  467.             self._number_left = length // chunksize + bool(length % chunksize)
  468.  
  469.     
  470.     def _set(self, i, success_result):
  471.         (success, result) = success_result
  472.         if success:
  473.             self._value[i * self._chunksize:(i + 1) * self._chunksize] = result
  474.             self._number_left -= 1
  475.             if self._number_left == 0:
  476.                 if self._callback:
  477.                     self._callback(self._value)
  478.                 
  479.                 del self._cache[self._job]
  480.                 self._cond.acquire()
  481.                 
  482.                 try:
  483.                     self._ready = True
  484.                     self._cond.notify()
  485.                 finally:
  486.                     self._cond.release()
  487.  
  488.             
  489.         else:
  490.             self._success = False
  491.             self._value = result
  492.             del self._cache[self._job]
  493.             self._cond.acquire()
  494.             
  495.             try:
  496.                 self._ready = True
  497.                 self._cond.notify()
  498.             finally:
  499.                 self._cond.release()
  500.  
  501.  
  502.  
  503.  
  504. class IMapIterator(object):
  505.     
  506.     def __init__(self, cache):
  507.         self._cond = threading.Condition(threading.Lock())
  508.         self._job = job_counter.next()
  509.         self._cache = cache
  510.         self._items = collections.deque()
  511.         self._index = 0
  512.         self._length = None
  513.         self._unsorted = { }
  514.         cache[self._job] = self
  515.  
  516.     
  517.     def __iter__(self):
  518.         return self
  519.  
  520.     
  521.     def next(self, timeout = None):
  522.         self._cond.acquire()
  523.         
  524.         try:
  525.             item = self._items.popleft()
  526.         except IndexError:
  527.             if self._index == self._length:
  528.                 raise StopIteration
  529.             self._index == self._length
  530.             self._cond.wait(timeout)
  531.             
  532.             try:
  533.                 item = self._items.popleft()
  534.             except IndexError:
  535.                 if self._index == self._length:
  536.                     raise StopIteration
  537.                 self._index == self._length
  538.                 raise TimeoutError
  539.             except:
  540.                 None<EXCEPTION MATCH>IndexError
  541.             
  542.  
  543.             None<EXCEPTION MATCH>IndexError
  544.         finally:
  545.             self._cond.release()
  546.  
  547.         (success, value) = item
  548.         if success:
  549.             return value
  550.         raise value
  551.  
  552.     __next__ = next
  553.     
  554.     def _set(self, i, obj):
  555.         self._cond.acquire()
  556.         
  557.         try:
  558.             if self._index == i:
  559.                 self._items.append(obj)
  560.                 self._index += 1
  561.                 while self._index in self._unsorted:
  562.                     obj = self._unsorted.pop(self._index)
  563.                     self._items.append(obj)
  564.                     self._index += 1
  565.                     continue
  566.                     self
  567.                 self._cond.notify()
  568.             else:
  569.                 self._unsorted[i] = obj
  570.             if self._index == self._length:
  571.                 del self._cache[self._job]
  572.         finally:
  573.             self._cond.release()
  574.  
  575.  
  576.     
  577.     def _set_length(self, length):
  578.         self._cond.acquire()
  579.         
  580.         try:
  581.             self._length = length
  582.             if self._index == self._length:
  583.                 self._cond.notify()
  584.                 del self._cache[self._job]
  585.         finally:
  586.             self._cond.release()
  587.  
  588.  
  589.  
  590.  
  591. class IMapUnorderedIterator(IMapIterator):
  592.     
  593.     def _set(self, i, obj):
  594.         self._cond.acquire()
  595.         
  596.         try:
  597.             self._items.append(obj)
  598.             self._index += 1
  599.             self._cond.notify()
  600.             if self._index == self._length:
  601.                 del self._cache[self._job]
  602.         finally:
  603.             self._cond.release()
  604.  
  605.  
  606.  
  607.  
  608. class ThreadPool(Pool):
  609.     from dummy import Process
  610.     
  611.     def __init__(self, processes = None, initializer = None, initargs = ()):
  612.         Pool.__init__(self, processes, initializer, initargs)
  613.  
  614.     
  615.     def _setup_queues(self):
  616.         self._inqueue = Queue.Queue()
  617.         self._outqueue = Queue.Queue()
  618.         self._quick_put = self._inqueue.put
  619.         self._quick_get = self._outqueue.get
  620.  
  621.     
  622.     def _help_stuff_finish(inqueue, task_handler, size):
  623.         inqueue.not_empty.acquire()
  624.         
  625.         try:
  626.             inqueue.queue.clear()
  627.             inqueue.queue.extend([
  628.                 None] * size)
  629.             inqueue.not_empty.notify_all()
  630.         finally:
  631.             inqueue.not_empty.release()
  632.  
  633.  
  634.     _help_stuff_finish = staticmethod(_help_stuff_finish)
  635.  
  636.